from pyspark.sql import SparkSession,DataFrame

class BestFilmsByOverallRating:

    def run(self,moviesDataSet:DataFrame,ratingsDataSet:DataFrame,spark:SparkSession):

        # 将dataframe注册成视图
        moviesDataSet.createOrReplaceTempView("movies")
        ratingsDataSet.createOrReplaceTempView("ratings")


        sql="""
            with rating_cnt as(
                select movieId,count(1) as rating_cnt,avg(rating) as avg_rating
                from ratings group by movieId
                having count(1)>=5000
            ),rating_score as(
                select movieId,avg_rating
                from rating_cnt order by avg_rating desc
                limit 10
            )
            select m.movieId,m.title,r.avg_rating as avgRating
            from rating_score r inner join movies m on r.movieId=m.movieId
        """


        resultDS=spark.sql(sql)
        return resultDS
